#ifndef __MQ_BROKER_
#define __MQ_BROKER_

#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"
#include "../mqcommon/mq_threadpool.hpp"
#include "../mqcommon/mq_msg.pb.h"
#include "../mqcommon/mq_proto.pb.h"
#include "../mqcommon/mq_log.hpp"
#include "mq_connection.hpp"
#include "mq_consumer.hpp"
#include "mq_virtualhost.hpp"

namespace MQ
{

    class Server
    {
    public:
        using MessagePtr = std::shared_ptr<google::protobuf::Message>;

        Server(int port, const std::string &basedir)
            : _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), "server", muduo::net::TcpServer::kReusePort),
              _dispatcher(std::bind(&Server::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
              _codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),
              _vp(std::make_shared<VirtualHost>("virtualhost", basedir, basedir + "/meta.db")),
              _consumer_manager(std::make_shared<ConsumerManager>()),
              _connection_manager(std::make_shared<ConnectionManager>()),
              _tp(std::make_shared<ThreadPool>())
        {
            QueueMap qm = _vp->allQueue();
            for (auto &q : qm)
            {
                _consumer_manager->initQueueConsumer(q.first);
            }

            // 注册函数
            _dispatcher.registerMessageCallback<MQ::openChannelRequest>(std::bind(&Server::onOpenChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<MQ::closeChannelRequest>(std::bind(&Server::onCloseChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<MQ::declareExchangeRequest>(std::bind(&Server::onDeclareExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<MQ::deleteExchangeRequest>(std::bind(&Server::onDeleteExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<MQ::declareQueueRequest>(std::bind(&Server::onDeclareQueue, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<MQ::deleteQueueRequest>(std::bind(&Server::onDeleteQueue, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<MQ::queueBindRequest>(std::bind(&Server::onQueueBind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<MQ::queueUnBindRequest>(std::bind(&Server::onQueueUnBind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<MQ::basicPublishRequest>(std::bind(&Server::onBasicPublish, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<MQ::basicAckRequest>(std::bind(&Server::onBasicAck, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<MQ::basicConsumeRequest>(std::bind(&Server::onBasicConsume, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<MQ::basicCancelRequest>(std::bind(&Server::onBasicCancel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

            // server注册函数
            _server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _server.setConnectionCallback(std::bind(&Server::onConnection, this, std::placeholders::_1));
        }

        void start()
        {
            _server.start();
            _baseloop.loop();
        }

    private:
        void onConnection(const muduo::net::TcpConnectionPtr &conn)
        {
            if (conn->connected())
            {
                _connection_manager->newConnection(conn, _vp, _consumer_manager, _codec, _tp);
            }
            else
            {
                _connection_manager->delConnection(conn);
            }
        }

        void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp)
        {
            LOG(WARNING, "onUnknownMessag:%s\n", message->GetTypeName());
            conn->shutdown();
        }

        void onOpenChannel(const muduo::net::TcpConnectionPtr &conn, const openChannelRequestPtr &message, muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn.get() == nullptr)
            {
                LOG(ERROR, "打开信道时,没有找到连接对应的Connection对象！");
                conn->shutdown();
                return;
            }
            mconn->openChannel(message);
        }

        void onCloseChannel(const muduo::net::TcpConnectionPtr &conn, const closeChannelRequestPtr &message, muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn.get() == nullptr)
            {
                LOG(ERROR, "关闭信道,没有找到连接对应的Connection对象！");
                conn->shutdown();
                return;
            }
            mconn->closeChannel(message);
        }

        void onDeclareExchange(const muduo::net::TcpConnectionPtr &conn, const declareExchangeRequestPtr &message, muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn.get() == nullptr)
            {
                LOG(ERROR, "声明交换机时,没有找到连接对应的Connection对象！");
                conn->shutdown();
                return;
            }
            Channel::ptr cp = mconn->getChannel(message->cid());
            if (cp.get() == nullptr)
            {
                LOG(ERROR, "声明交换机时，没有找到信道！");
                return;
            }
            cp->declareExchange(message);
        }

        void onDeleteExchange(const muduo::net::TcpConnectionPtr &conn, const deleteExchangeRequestPtr &message, muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn.get() == nullptr)
            {
                LOG(ERROR, "删除交换机时,没有找到连接对应的Connection对象！");
                conn->shutdown();
                return;
            }
            Channel::ptr cp = mconn->getChannel(message->cid());
            if (cp.get() == nullptr)
            {
                LOG(ERROR, "删除交换机时，没有找到信道！");
                return;
            }
            cp->deleteExchange(message);
        }

        void onDeclareQueue(const muduo::net::TcpConnectionPtr &conn, const declareQueueRequestPtr &message, muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn.get() == nullptr)
            {
                LOG(ERROR, "声明队列时，没有找到连接对应的Connection对象！");
                conn->shutdown();
                return;
            }
            Channel::ptr cp = mconn->getChannel(message->cid());
            if (cp.get() == nullptr)
            {
                LOG(ERROR, "声明队列时，没有找到信道！");
                return;
            }
            cp->declareQueue(message);
        }

        void onDeleteQueue(const muduo::net::TcpConnectionPtr &conn, const deleteQueueRequestPtr &message, muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn.get() == nullptr)
            {
                LOG(ERROR, "删除队列时，没有找到连接对应的Connection对象！");
                conn->shutdown();
                return;
            }
            Channel::ptr cp = mconn->getChannel(message->cid());
            if (cp.get() == nullptr)
            {
                LOG(ERROR, "删除队列时，没有找到信道！");
                return;
            }
            cp->deleteQueue(message);
        }

        void onQueueBind(const muduo::net::TcpConnectionPtr &conn, const queueBindRequestPtr &message, muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn.get() == nullptr)
            {
                LOG(ERROR, "队列绑定时，没有找到连接对应的Connection对象！");
                conn->shutdown();
                return;
            }
            Channel::ptr cp = mconn->getChannel(message->cid());
            if (cp.get() == nullptr)
            {
                LOG(ERROR, "队列绑定时，没有找到信道！");
                return;
            }
            cp->queueBind(message);
        }

        void onQueueUnBind(const muduo::net::TcpConnectionPtr &conn, const queueUnBindRequestPtr &message, muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn.get() == nullptr)
            {
                LOG(ERROR, "队列解除绑定时，没有找到连接对应的Connection对象！");
                conn->shutdown();
                return;
            }
            Channel::ptr cp = mconn->getChannel(message->cid());
            if (cp.get() == nullptr)
            {
                LOG(ERROR, "队列解除绑定时，没有找到信道！");
                return;
            }
            cp->queueUnBind(message);
        }

        void onBasicPublish(const muduo::net::TcpConnectionPtr &conn, const basicPublishRequestPtr &message, muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn.get() == nullptr)
            {
                LOG(ERROR, "发布消息时，没有找到连接对应的Connection对象！");
                conn->shutdown();
                return;
            }
            Channel::ptr cp = mconn->getChannel(message->cid());
            if (cp.get() == nullptr)
            {
                LOG(ERROR, "发布消息时，没有找到信道！");
                return;
            }
            cp->basicPublish(message);
        }

        void onBasicAck(const muduo::net::TcpConnectionPtr &conn, const basicAckRequestPtr &message, muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn.get() == nullptr)
            {
                LOG(ERROR, "确认消息时，没有找到连接对应的Connection对象！");
                conn->shutdown();
                return;
            }
            Channel::ptr cp = mconn->getChannel(message->cid());
            if (cp.get() == nullptr)
            {
                LOG(ERROR, "确认消息时，没有找到信道！");
                return;
            }
            cp->basicAck(message);
        }

        void onBasicConsume(const muduo::net::TcpConnectionPtr &conn, const basicConsumeRequestPtr &message, muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn.get() == nullptr)
            {
                LOG(ERROR, "队列消息订阅时，没有找到连接对应的Connection对象！");
                conn->shutdown();
                return;
            }
            Channel::ptr cp = mconn->getChannel(message->cid());
            if (cp.get() == nullptr)
            {
                LOG(ERROR, "队列消息订阅时，没有找到信道！");
                return;
            }
            cp->basicConsume(message);
        }

        void onBasicCancel(const muduo::net::TcpConnectionPtr &conn, const basicCancelRequestPtr &message, muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn.get() == nullptr)
            {
                LOG(ERROR, "队列消息取消订阅时，没有找到连接对应的Connection对象！");
                conn->shutdown();
                return;
            }
            Channel::ptr cp = mconn->getChannel(message->cid());
            if (cp.get() == nullptr)
            {
                LOG(ERROR, "队列消息取消订阅时，没有找到信道！");
                return;
            }
            cp->basicCancel(message);
        }

    private:
        muduo::net::EventLoop _baseloop;
        muduo::net::TcpServer _server;
        ProtobufDispatcher _dispatcher;
        ProtobufCodecPtr _codec;
        VirtualHost::ptr _vp;
        ChannelManager::ptr _channel_manager;
        ConsumerManager::ptr _consumer_manager;
        ConnectionManager::ptr _connection_manager;
        ThreadPool::ptr _tp;
    };
}

#endif